"""
Main script to provide the Monte-Carlo sampling class extensions
Some tasks to consider doing soon
TODO: add option to add a secondary probability distribution to each parameter. This enables adaptive importance sampling
"""
import datetime
import importlib
import os
import numpy as np
from binarycpython.utils.functions import calculate_total_mass_system
# pylint: disable=E1101
_numba = False
comment_line = "###########\n"
[docs]class monte_carlo_sampling:
"""
Extension for the Population class containing the code for Monte-Carlo sampling functions
"""
def __init__(self, **kwargs):
"""
Init function for the spacing_functions class
"""
return
def _monte_carlo_sampling_cleanup(self):
"""
Cleanup function for the Monte-Carlo sampling evolution type
"""
self.population_options["_monte_carlo_threshold_reached"] = False
self.population_options["_monte_carlo_current_total_mass_evolved"] = 0
self.population_options["_monte_carlo_current_total_count_evolved"] = 0
def _monte_carlo_sampling_check_mass_threshold(self, system_dict):
"""
Function to handle checking the total mass evolved and signal to stop
"""
# Only if the monte_carlo_mass_threshold is positive (default = -1)
if self.population_options["monte_carlo_mass_threshold"] > 0:
# Add total mass of current system tot total mass evolved
self.population_options[
"_monte_carlo_current_total_mass_evolved"
] += calculate_total_mass_system(system_dict)
# Check if exceeds threshold
if (
self.population_options["_monte_carlo_current_total_mass_evolved"]
> self.population_options["monte_carlo_mass_threshold"]
):
self.population_options["_monte_carlo_threshold_reached"] = True
def _monte_carlo_sampling_check_count_threshold(self):
"""
Function to handle checking the total count evolved and signal to stop
"""
# Only if the monte_carlo_count_threshold is positive (default = -1)
if self.population_options["monte_carlo_count_threshold"] > 0:
# Add total mass of current system tot total count evolved
self.population_options["_monte_carlo_current_total_count_evolved"] += 1
# Check if exceeds threshold
if (
self.population_options["_monte_carlo_current_total_count_evolved"]
> self.population_options["monte_carlo_count_threshold"]
):
self.population_options["_monte_carlo_threshold_reached"] = True
def _monte_carlo_sampling_check_custom_threshold(self, result_queue):
"""
Function to handle checking the content of the result queue and look for a
The user-defined function has access to the result_queue and is responsible for processing (and emptying it)
"""
# Pass output of binary_c to a user-defined parsing function
if self.population_options["monte_carlo_custom_threshold_function"]:
self.population_options["monte_carlo_custom_threshold_function"](
self, result_queue
)
##############
# Generator functions
# Management
def _monte_carlo_sampling_get_generator_filename(self):
"""
Returns a filename of the generator file for the monte-carlo sampling
Returns:
filename (str): filename of the generator file for the monte-carlo sampling.
"""
if self.HPC_job():
filename = os.path.join(
self.population_options["tmp_dir"],
"binary_c_monte_carlo_sampling_generator_{population_id}.{jobid}.py".format(
population_id=self.population_options["_population_id"],
jobid=self.jobID(),
),
)
else:
filename = os.path.join(
self.population_options["tmp_dir"],
"binary_c_monte_carlo_sampling_generator_{population_id}.py".format(
population_id=self.population_options["_population_id"]
),
)
return filename
def _monte_carlo_sampling_write_generator_file(self, dry_run=False):
"""
Function to write the function that can generate the sample for the parameter
Contrary to the grid sampling variable generator code, this code is not nested, but is linear.
Args:
dry_run (bool, optional): flag to build with dry-run functionality.
"""
#########
# Set up header and start of function
self._add_code(
# Import packages
"import math\n",
"import copy\n",
"import numpy as np\n",
"from collections import OrderedDict\n",
"from binarycpython.utils.useful_funcs import *\n",
"\n\n",
# Make the function
"def monte_carlo_generator(self, print_results=True):\n",
)
# Increase indent_depth
self._increment_indent_depth(+1)
# Code to set up some parameters
self._add_code(
# Write some info in the function
"# Grid code generated on {}\n".format(datetime.datetime.now().isoformat()),
"# This function generates the systems that will be evolved with binary_c\n\n",
# Set some values in the generated code:
"# Set initial values\n",
"_total_starcount = 0\n",
"generating = True\n",
"default_system_dict = {'multiplicity': 0}\n\n",
)
# Add method for the pre-calculation of the arrays
if self.population_options["monte_carlo_use_pre_calculated_distributions"]:
self._add_code("self.handle_pre_calc(self)\n")
#########
# Set up the loop and yield calls
self._add_code(
# Comment
comment_line,
"# Start of the while loop\n",
# while loop
"while generating:\n",
)
# Increase indent_depth
self._increment_indent_depth(+1)
self._add_code("system_dict = copy.copy(default_system_dict)\n\n")
#########
# Run loop of sampling variable setup
for loopnr, sampling_variable_el in enumerate(
sorted(
self.population_options["_sampling_variables"].items(),
key=lambda x: x[1]["sampling_variable_number"],
)
):
sampling_variable = sampling_variable_el[1]
#########
# Generate parameter lines
self._monte_carlo_sampling_write_generator_parameter(sampling_variable)
#########
# Generate yield call if branchpoint
self._monte_carlo_sampling_write_system_call(
sampling_variable,
dry_run,
sampling_variable["branchpoint"],
sampling_variable["branchcode"],
)
#########
# Write to file
monte_carlo_sampling_generator_filename = (
self._monte_carlo_sampling_get_generator_filename()
)
self.population_options[
"_monte_carlo_sampling_generator_filename"
] = monte_carlo_sampling_generator_filename
self.vb_info(
"{blue}Write grid code to {file} {reset}".format(
blue=self.ANSI_colours["blue"],
file=monte_carlo_sampling_generator_filename,
reset=self.ANSI_colours["reset"],
),
)
with self.open(
monte_carlo_sampling_generator_filename, "w", encoding="utf-8"
) as file:
file.write(self.code_string)
self.code_string = ""
# Construction
def _monte_carlo_sampling_write_system_call(
self, sampling_variable, dry_run, branchpoint, branchcode
):
"""
Function to write the block of code (as string) that handles the setting the final probability, taking into account the weight and repeat settings, incrementing the total starcount and total probability.
Then if the run is a dry run we implement the dry_run_hook or pass depending on the settings. If it is not a dry run we yield the system dict
"""
if branchpoint:
self._add_code(comment_line)
if branchcode:
self._add_code(
"# Branch code\n",
"if {branchcode}:\n".format(branchcode=branchcode),
)
self._increment_indent_depth(+1)
# Factor in the custom weight input
self._add_code(
"\n",
"# Loop over the repeats\n",
'for _ in range(self.population_options["repeat"]):' + "\n",
)
self._add_code(
"_total_starcount += 1\n",
indent=1,
)
if not dry_run:
# Handle what is returned, or what is not.
self._add_code("yield system_dict\n\n", indent=1)
# If its a dry run, dont do anything with it
else:
# run the hook function, only if given
if self.population_options["dry_run_hook"]:
self._add_code(
"self.population_options['dry_run_hook'](self, system_dict)\n\n",
indent=1,
)
else:
# or pass
self._add_code("pass\n\n", indent=1)
self._increment_indent_depth(-1)
return self.code_string
def _monte_carlo_sampling_write_generator_parameter(self, sampling_variable):
"""
Function to write the parameter lines and the pre and postcode around it to the monte carlo sampling generator function.
"""
self.vb_debug(
"Constructing/adding: {}".format(sampling_variable["parameter_name"]),
)
####################
# Write comment
self._add_code(
comment_line, "# Sampling variable {}\n\n".format(sampling_variable["name"])
)
self.vb_debug(
"Writing sampling variable {} to monte-carlo sampling generator".format(
sampling_variable["parameter_name"]
),
)
####################
# Generate top code
if sampling_variable["topcode"]:
self._add_code(
comment_line,
sampling_variable["topcode"],
"\n\n",
)
#########
# Generate call to pdf-cdf function to generate the parameter value
self._add_code(
comment_line,
"{} = self.handle_calc_sampled_value(self, sampling_variable={}{})\n".format(
sampling_variable["parameter_name"],
sampling_variable,
", "
+ ",".join(
[
"{}={}".format(dependency_variable, dependency_variable)
for dependency_variable in sampling_variable[
"dependency_variables"
]
]
)
if sampling_variable["dependency_variables"]
else "",
),
"system_dict['{}'] = {}\n\n".format(
sampling_variable["parameter_name"], sampling_variable["parameter_name"]
),
)
####################
# bottom code
if sampling_variable["bottomcode"]:
self._add_code(
comment_line,
sampling_variable["bottomcode"] + "\n\n",
)
##############
# Sampling functions
# Management
def _monte_carlo_sampling_get_sampling_functions_filename(self):
"""
Returns a filename for the file containing the function to generate values for the variables.
"""
filename = os.path.join(
self.population_options["tmp_dir"],
"binary_c_monte_carlo_sampling_sampling_functions_{population_id}.py".format(
population_id=self.population_options["_population_id"]
),
)
return filename
def _monte_carlo_sampling_load_sampling_functions_file(self):
"""
Function to load the sampling functions
"""
# Code to load the generator code
self.vb_debug(
message="Load monte-carlo sampling functions from {file}".format(
file=self.population_options[
"_monte_carlo_sampling_sampling_functions_filename"
]
),
)
# Load the module from file and the functions in it.
spec = importlib.util.spec_from_file_location(
"binary_c_python_monte_carlo_sampling_functions",
os.path.join(
self.population_options[
"_monte_carlo_sampling_sampling_functions_filename"
]
),
)
monte_carlo_sampling_functions_file = importlib.util.module_from_spec(spec)
spec.loader.exec_module(monte_carlo_sampling_functions_file)
# NOTE: the way we are loading and dynamically is i think not fully correct. I have to pass the self in twice.
# TODO: fix the setting of these functions properly
# Create dictionary to hold the calc_sampled_value functions
self.monte_carlo_calc_sampled_value_functions = {}
for obj in dir(monte_carlo_sampling_functions_file):
if obj.startswith("calc_sampled_value"):
# Set function as class function
setattr(self, obj, getattr(monte_carlo_sampling_functions_file, obj))
# Add to dict
self.monte_carlo_calc_sampled_value_functions[
obj.replace("calc_sampled_value_", "")
] = getattr(monte_carlo_sampling_functions_file, obj)
# Create dictionary to hold the calc_pdf_cdf_value_array_dict functions
self.monte_carlo_calc_pdf_cdf_value_array_dict_functions = {}
for obj in dir(monte_carlo_sampling_functions_file):
if obj.startswith("calc_pdf_cdf_value_array_dict"):
# Set function as class function
setattr(self, obj, getattr(monte_carlo_sampling_functions_file, obj))
# Add to dict
self.monte_carlo_calc_pdf_cdf_value_array_dict_functions[
obj.replace("calc_pdf_cdf_value_array_dict_", "")
] = getattr(monte_carlo_sampling_functions_file, obj)
# Load calc_sampled_value handler function
for obj in dir(monte_carlo_sampling_functions_file):
if obj.startswith("handle_calc_sampled_value"):
self.handle_calc_sampled_value = getattr(
monte_carlo_sampling_functions_file, obj
)
break
# Load pre_calc handler function
if self.population_options["monte_carlo_use_pre_calculated_distributions"]:
for obj in dir(monte_carlo_sampling_functions_file):
if obj.startswith("handle_pre_calc"):
self.handle_pre_calc = getattr(
monte_carlo_sampling_functions_file, obj
)
# setattr(self, obj, getattr(monte_carlo_sampling_functions_file, obj))
# MethodType(func, obj)
# MethodType(getattr(monte_carlo_sampling_functions_file, obj), self)
break
def _monte_carlo_sampling_write_sampling_functions_file(self, dry_run=False):
"""
Function to write the functions that handle generating the values for each variable
"""
#########
# Set up header and start of function
self._add_code(
# Import packages
"import math\n",
"import copy\n",
"import numpy as np\n",
"from collections import OrderedDict\n",
"from binarycpython.utils.useful_funcs import *\n",
"\n\n",
)
#########
# Write functions to calculate the values for each parameter.
for loopnr, sampling_variable_el in enumerate(
sorted(
self.population_options["_sampling_variables"].items(),
key=lambda x: x[1]["sampling_variable_number"],
)
):
sampling_variable = sampling_variable_el[1]
######################
# Call to function to write the calc_sample_value function
self._monte_carlo_sampling_write_calc_pdf_cdf_value_array_dict_function(
sampling_variable
)
######################
# Call to function to write the calc_sample_value function
self._monte_carlo_sampling_write_calc_sampled_value_function(
sampling_variable
)
######################
# Call to function to write the pre_calc_sample_value function
if self.population_options["monte_carlo_use_pre_calculated_distributions"]:
self._monte_carlo_sampling_write_pre_calc_function(sampling_variable)
######################
# Call to function to write the handle_calc_sample_value wrapper function
self._monte_carlo_sampling_write_handle_calc_sampled_value_function()
######################
# Call to function to write the handle_pre_calc function and other functions
if self.population_options["monte_carlo_use_pre_calculated_distributions"]:
self._monte_carlo_sampling_write_handle_pre_calc_function()
self._monte_carlo_sampling_write_bin_function()
self._monte_carlo_sampling_write_center_function()
#########
# Write to file
monte_carlo_sampling_sampling_functions_filename = (
self._monte_carlo_sampling_get_sampling_functions_filename()
)
self.population_options[
"_monte_carlo_sampling_sampling_functions_filename"
] = monte_carlo_sampling_sampling_functions_filename
self.vb_debug(
"{blue}Write sampling functions code to {file} {reset}".format(
blue=self.ANSI_colours["blue"],
file=monte_carlo_sampling_sampling_functions_filename,
reset=self.ANSI_colours["reset"],
),
)
with self.open(
monte_carlo_sampling_sampling_functions_filename, "w", encoding="utf-8"
) as file:
file.write(self.code_string)
# Reset indentation
self.code_string = ""
# Construction
def _monte_carlo_sampling_write_pdf_cdf_value_array_block(self, sampling_variable):
"""
Function to write the code-block for the pdf/cdf value array functions
"""
#######
# Set up lists
self._add_code(
comment_line,
"value_array = {}\n".format(
sampling_variable["samplerfunc"]
), # Line for the sampler func value array sampling (not the same as parameter value array)
"parameter_value_array = []\n",
"probability_array = [0]\n\n",
)
#######
# Set up first parameter value array adding
self._add_code(
comment_line,
"{} = value_array[0]\n".format(sampling_variable["name"]),
"{}\n".format(
sampling_variable["precode"].replace("\n", "\n" + self._indent_block(0))
)
if sampling_variable["precode"]
else "\n",
"parameter_value_array.append({})\n\n".format(
sampling_variable["parameter_name"]
),
)
#######
# Set up loop for sampling the distribution function
self._add_code(
comment_line,
"for {} in value_array[1:]:\n".format(sampling_variable["name"]),
)
self._increment_indent_depth(+1)
#######
# Set up conversion with precode
self._add_code(
comment_line,
"{}\n".format(
sampling_variable["precode"].replace("\n", "\n" + self._indent_block(0))
)
if sampling_variable["precode"]
else "\n",
"parameter_value_array.append({})\n\n".format(
sampling_variable["parameter_name"]
),
)
#######
# Set up calculation of probability
# TODO: implement the call to a second pdf here (could be anything, but perhaps a likelihood or AIS)
self._add_code(
comment_line,
"probability = {}\n".format(sampling_variable["probdist"]),
"probability_array.append(probability)\n\n",
)
self._increment_indent_depth(-1)
#######
# Construct method to calculate pdf/cdf and value array
self._add_code(
comment_line,
"probability_array = np.array(probability_array)\n",
"cdf_array = np.cumsum(probability_array) / np.sum(probability_array)\n",
"parameter_value_array = np.array(parameter_value_array)\n\n",
)
#######
# Construct pdf/cdf and array dict and return
self._add_code(
comment_line,
"pdf_cdf_value_array_dict = {'probability_array': probability_array, 'cdf_array': cdf_array, 'value_array': value_array, 'parameter_value_array': parameter_value_array}\n\n",
)
def _monte_carlo_sampling_write_bin_function(self):
"""
Function to write the bin function
"""
# self._add_code(
# comment_line,
# "def bin(parameter_value_array, parameter_value):\n",
# " index = np.digitize(parameter_value, bins=parameter_value_array, right=False)-1\n",
# " center_value = (parameter_value_array[index+1]+parameter_value_array[index])/2\n",
# " return center_value\n",
# )
self._add_code(
comment_line,
"def bin(parameter_value_array, parameter_value):\n",
" index = np.digitize(parameter_value, bins=parameter_value_array, right=False)-1\n",
" binned_value = parameter_value_array[index+1]\n",
" return binned_value\n",
)
def _monte_carlo_sampling_write_center_function(self):
"""
Function to write the center function
"""
self._add_code(
comment_line,
"def center(sampling_func):\n",
" value_array = sampling_func\n",
" center_value_array = (value_array[1:]+value_array[:-1])/2\n",
" return center_value_array\n",
)
def _monte_carlo_sampling_write_handle_pre_calc_function(self):
"""
Function to write then handler function for the calc_sampled_value
"""
#######
# Construct function
self._add_code(
"def handle_pre_calc(self):\n".format(),
)
self._increment_indent_depth(+1)
#######
# Initialise dict
self._add_code(
"self.pre_calculated_pdf_cdf_value_array_dicts = {}\n",
)
#######
# Loop over variables and call them to set the pre-calculated pdf cdf information in the dictionary
for loopnr, sampling_variable_el in enumerate(
sorted(
self.population_options["_sampling_variables"].items(),
key=lambda x: x[1]["sampling_variable_number"],
)
):
sampling_variable = sampling_variable_el[1]
#######
# Write call to specific functions to pre-calculate each pdf/cdf dict
self._add_code(
"self.pre_calculated_pdf_cdf_value_array_dicts['{}'] = pre_calculate_pdf_cdf_value_array_dict_{}(self)\n".format(
sampling_variable["parameter_name"],
sampling_variable["parameter_name"],
),
)
#######
# De-dent and padd
self._increment_indent_depth(-1)
self._add_code("\n\n")
def _monte_carlo_sampling_write_calc_sampled_value_function(
self, sampling_variable
):
"""
Function to write the calc_sampled_value call
"""
#######
# Construct function
self._add_code(
"def calc_sampled_value_{}(self{}):\n".format(
sampling_variable["parameter_name"],
", "
+ ",".join(
[
"{}".format(dependency_variable)
for dependency_variable in sampling_variable[
"dependency_variables"
]
]
)
if sampling_variable["dependency_variables"]
else "",
),
)
self._increment_indent_depth(+1)
#######
# Call to pdf_cdf_value_array_dict function
self._add_code(
comment_line,
"if self.population_options['monte_carlo_use_pre_calculated_distributions']:\n",
)
######
# Add calls to the dependency variable's pre-calculated parameter_value_array
if sampling_variable["dependency_variables"]:
for dependency_variable in sampling_variable["dependency_variables"]:
# Create string for getting the parameter value array
string = "self.pre_calculated_pdf_cdf_value_array_dicts['{}']".format(
dependency_variable
)
if self.population_options["_sampling_variables_parameter_names"][
dependency_variable
]["dependency_variables"]:
for depth in range(
0,
len(
self.population_options[
"_sampling_variables_parameter_names"
][dependency_variable]["dependency_variables"]
),
):
string += "['{{:.6f}}'.format(binned_{})]".format(
sampling_variable["dependency_variables"][depth]
)
# Add code to get the binned value of the current dependency value
self._add_code(
" binned_{} = bin({}['parameter_value_array'], {})\n".format(
dependency_variable, string, dependency_variable
)
)
self._add_code(
" pdf_cdf_value_array_dict = self.pre_calculated_pdf_cdf_value_array_dicts['{}']{}\n".format(
sampling_variable["parameter_name"],
"".join(
[
"['{{:.6f}}'.format(binned_{})]".format(dependency_variable)
for dependency_variable in sampling_variable[
"dependency_variables"
]
]
)
if sampling_variable["dependency_variables"]
else "",
),
"else:\n",
" pdf_cdf_value_array_dict = self.monte_carlo_calc_pdf_cdf_value_array_dict_functions['{}'](self{})\n\n".format(
sampling_variable["parameter_name"],
", "
+ ",".join(
[
"{}".format(dependency_variable)
for dependency_variable in sampling_variable[
"dependency_variables"
]
]
)
if sampling_variable["dependency_variables"]
else "",
),
comment_line,
"cdf_array = pdf_cdf_value_array_dict['cdf_array']\n",
"value_array = pdf_cdf_value_array_dict['value_array']\n",
"parameter_value_array = pdf_cdf_value_array_dict['parameter_value_array']\n\n",
)
#######
# Get random sample
self._add_code(
comment_line,
"random_sample = np.random.uniform(low=0.0, high=1.0, size=(1))\n\n",
)
#######
# Calculate indices and edge points
self._add_code(
comment_line,
"left_index = np.digitize(random_sample, bins=cdf_array, right=False)-1\n",
"right_index = left_index + 1\n\n",
)
#######
# Get probabilities of bin-edges and set
self._add_code(
comment_line,
"left_cdf = cdf_array[left_index]\n",
"right_cdf = cdf_array[right_index]\n",
"dcdf = right_cdf - left_cdf\n\n",
)
#######
# Get values of bin-edges and set
self._add_code(
comment_line,
"left_value = parameter_value_array[left_index]\n",
"right_value = parameter_value_array[right_index]\n",
"dvalue = right_value - left_value\n\n",
)
#######
# Calculate new point by interpolation
self._add_code(
comment_line,
"dvalue_dcdf = dvalue/dcdf\n",
"dist_to_left = random_sample - left_cdf\n",
"{} = left_value + (dvalue_dcdf) * dist_to_left\n\n".format(
sampling_variable["parameter_name"]
),
)
#######
# Turn the new point into the actual parameter if we have post-code
if sampling_variable["postcode"]:
self._add_code(
comment_line,
sampling_variable["postcode"] + "\n\n",
)
#######
# return value
self._add_code(
comment_line,
"return {}[0]\n".format(sampling_variable["parameter_name"]),
)
#######
# De-dent and padd
self._increment_indent_depth(-1)
self._add_code("\n\n")
def _monte_carlo_sampling_write_calc_pdf_cdf_value_array_dict_function(
self, sampling_variable
):
"""
Function to write the calc_pdf_cdf_value_array_dict_function call
"""
#######
# Construct function
self._add_code(
"def calc_pdf_cdf_value_array_dict_{}(self{}):\n".format(
sampling_variable["parameter_name"],
", "
+ ",".join(
[
"{}".format(dependency_variable)
for dependency_variable in sampling_variable[
"dependency_variables"
]
]
)
if sampling_variable["dependency_variables"]
else "",
),
)
self._increment_indent_depth(+1)
#######
# Write the pdf cdf value array code block
self._monte_carlo_sampling_write_pdf_cdf_value_array_block(sampling_variable)
#######
# Construct pdf/cdf and array dict and return
self._add_code(
"return pdf_cdf_value_array_dict\n",
)
#######
# De-dent and pad
self._increment_indent_depth(-1)
self._add_code("\n\n")
def _monte_carlo_sampling_write_handle_calc_sampled_value_function(self):
"""
Function to write then handler function for the calc_sampled_value
"""
#######
# Construct function
self._add_code(
"def handle_calc_sampled_value(self, sampling_variable, **kwargs):\n".format(),
)
self._increment_indent_depth(+1)
#######
# Get correct function and call the function with the arguments
self._add_code(
"calc_sampled_value_function = self.monte_carlo_calc_sampled_value_functions[sampling_variable['parameter_name']]\n",
"sampled_value = calc_sampled_value_function(self, **kwargs)\n\n",
"return sampled_value\n",
)
#######
# De-dent and padd
self._increment_indent_depth(-1)
self._add_code("\n\n")
def _monte_carlo_sampling_write_pre_calc_function(self, sampling_variable):
"""
Function to write the pre_calc function for each variable
"""
#######
# Construct function
self._add_code(
"def pre_calculate_pdf_cdf_value_array_dict_{}(self):\n".format(
sampling_variable["parameter_name"]
),
)
self._increment_indent_depth(+1)
#######
# Handle construction if we have dependency variables
if sampling_variable["dependency_variables"]:
#######
# Construct nested pdf cdf dict
self._add_code(
comment_line,
"nested_pdf_cdf_value_array_dict = {}\n\n",
)
sampling_variable_dict_based_on_parameter_name = {
sampling_variable_el["parameter_name"]: sampling_variable_el
for sampling_variable_el in self.population_options[
"_sampling_variables"
].values()
}
dependency_variable_list = [
sampling_variable_dict_based_on_parameter_name[dependency_variable_key]
for dependency_variable_key in sampling_variable["dependency_variables"]
]
#######
# Loop over all dependency variables
for dependency_variable_i, dependency_variable_key in enumerate(
sampling_variable["dependency_variables"]
):
dependency_variable = sampling_variable_dict_based_on_parameter_name[
dependency_variable_key
]
################
# Create dict if we are in
if (
not dependency_variable_i
== len(sampling_variable["dependency_variables"])
) and (not dependency_variable_i == 0):
self._add_code(
comment_line,
"nested_pdf_cdf_value_array_dict['{{:.6f}}'.format({})] = {{}}\n".format(
sampling_variable["dependency_variables"][
dependency_variable_i - 1
]
),
)
#######
# Construct the loop over the
self._add_code(
comment_line,
"value_array_{} = {}\n\n".format(
dependency_variable["name"], dependency_variable["samplerfunc"]
),
)
#######
# Set up loop for sampling the distribution function
# NOTE: previously there was a center statement here (for {} in center(value_array_{})). Not sure why i did that. I removed it now.
self._add_code(
comment_line,
"for {} in value_array_{}:\n\n".format(
dependency_variable["name"], dependency_variable["name"]
),
)
self._increment_indent_depth(+1)
self._add_code(
"{}\n".format(
dependency_variable["precode"].replace(
"\n", "\n" + self._indent_block(0)
)
)
if dependency_variable["precode"]
else "\n"
)
#######
# Construct parts to build the pdf cdf function
self._monte_carlo_sampling_write_pdf_cdf_value_array_block(
sampling_variable
)
#######
# Write code to store the dict in the nested dict
nested_dict_calls = "".join(
[
"['{{:.6f}}'.format({})]".format(
cur_dependency_variable["parameter_name"]
)
for cur_dependency_variable in dependency_variable_list
]
)
self._add_code(
comment_line,
"nested_pdf_cdf_value_array_dict{} = pdf_cdf_value_array_dict\n\n".format(
nested_dict_calls
),
)
#######
# de-dent the code
for dependency_variable_i, dependency_variable_key in enumerate(
sampling_variable["dependency_variables"][::-1]
):
self._increment_indent_depth(-1)
#######
# Construct dictionary to hold the other dictionaries
self._add_code(
"return nested_pdf_cdf_value_array_dict\n",
)
#######
# Handle construction if we don't have dependency variables
else:
#######
# Construct parts to build the pdf cdf function
self._monte_carlo_sampling_write_pdf_cdf_value_array_block(
sampling_variable
)
#######
# Write return statement
self._add_code(
"return pdf_cdf_value_array_dict\n",
)
#######
# De-dent and padd
self._increment_indent_depth(-1)
self._add_code("\n\n")
# Misc
def _sample_multiplicity(self, system_dict):
"""
Function to calculate the multiplicity based on grid choices and random guess
"""
##############
# If we choose to use no multiplicity fraction function then we always return multiplicity of 1
if self.population_options["multiplicity_fraction_function"] in [0, "None"]:
return 1
# Otherwise we will get the multiplicity fraction dict sample a random value to determine which multiplicty we have
multiplicity_dict = self._get_multiplicity_dict(system_dict)
# TODO: this can probably be a oneliner
multiplicity_list, probability_list = [], []
for multiplicity, probability in sorted(
multiplicity_dict.items(), key=lambda x: x[0]
):
multiplicity_list.append(multiplicity)
probability_list.append(probability)
multiplicity_array = np.array(multiplicity_list)
probability_array = np.array(probability_list)
##############
# Calculate CDF and sample
probability_cdf = np.cumsum(probability_array)
random_number = np.random.uniform()
indices = np.digitize(random_number, bins=probability_cdf, right=False)
# Get multiplicity and return
multiplicity = multiplicity_array[indices]
return multiplicity
##############
# Management functions
def _monte_carlo_sampling_load_generator(self):
"""
Function to load the monte_carlo grid
"""
# Code to load the generator code
self.vb_info(
message="Load monte-carlo generator function from {file}".format(
file=self.population_options["_monte_carlo_sampling_generator_filename"]
),
)
spec = importlib.util.spec_from_file_location(
"binary_c_python_monte_carlo_sampling_generator",
os.path.join(
self.population_options["_monte_carlo_sampling_generator_filename"]
),
)
monte_carlo_sampling_generator_file = importlib.util.module_from_spec(spec)
spec.loader.exec_module(monte_carlo_sampling_generator_file)
generator = monte_carlo_sampling_generator_file.monte_carlo_generator
self.population_options["_monte_carlo_sampling_generator"] = generator
self.vb_info("Monte-carlo system generator")
def _monte_carlo_sampling_setup(self):
"""
Function to prepate the class for a monte-carlo sampling simulation
"""
self.vb_info("setup mc sampling")
# Put in check
if len(self.population_options["_sampling_variables"]) == 0:
msg = "Error: you haven't defined any sampling variables! Aborting"
raise ValueError(msg)
# Make a dictionary with the names of the sampling variables
self.population_options["_sampling_variables_parameter_names"] = {
sampling_variable[1]["parameter_name"]: sampling_variable[1]
for sampling_variable in self.population_options[
"_sampling_variables"
].items()
}
def _monte_carlo_sampling_get_generator(self):
"""
Function to get the generator for the source_file sampling method. Called by _get_generator and used in the actual evolution loop.
"""
# Write the sampling functions file
self._monte_carlo_sampling_write_sampling_functions_file()
# Load the sampling functions
self._monte_carlo_sampling_load_sampling_functions_file()
# Write generator file
self._monte_carlo_sampling_write_generator_file()
# Load generator
self._monte_carlo_sampling_load_generator()
# Get generator file
generator = self.population_options["_monte_carlo_sampling_generator"](
self, print_results=False
)
return generator
##############
# misc
##############
# Writing binary_c calls to file:
[docs] def monte_carlo_sampling_write_binary_c_calls_to_file(
self, output_file, include_defaults
):
"""
Function to write the generated grid to a file.
TODO: this can be merged with the original write_binary_c_calls to file ... Have the get_generator handle the setup
"""
#######
# Start up the generator
generator = self._get_generator()
if not self.population_options["monte_carlo_mass_threshold"] > 0:
raise ValueError("Cant write MC sampling to file without a mass threshold")
# then if the _system_generator is present, we go through it
if generator:
# Write to file
with self.open(output_file, "w", encoding="utf-8") as file:
# Get defaults and clean them, then overwrite them with the set values.
if include_defaults:
# TODO: make sure that the defaults here are cleaned up properly
cleaned_up_defaults = self.cleaned_up_defaults
full_system_dict = cleaned_up_defaults.copy()
full_system_dict.update(self.bse_options.copy())
else:
full_system_dict = self.bse_options.copy()
for system_number, system_dict in enumerate(generator):
# update values with current system values
full_system_dict.update(system_dict)
######
# Handle monte-carlo threshold based on evolved mass
if self.population_options["evolution_type"] == "monte_carlo":
# Check based on mass threshold
self._monte_carlo_sampling_check_mass_threshold(
full_system_dict
)
######
# Check if evolution threshold is reached.
if self.population_options["_monte_carlo_threshold_reached"]:
break
#
binary_cmdline_string = self._return_argline(full_system_dict)
file.write(binary_cmdline_string + "\n")
else:
self.vb_error("Error. No grid function found!")
raise ValueError
return output_file